-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Connector-V2] [Clickhouse] Improve Clickhouse File Connector #3416
Conversation
Co-authored-by: Eric <gaojun2048@gmail.com>
...org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
Outdated
Show resolved
Hide resolved
...org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
Outdated
Show resolved
Hide resolved
...rg/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
Outdated
Show resolved
Hide resolved
@@ -44,6 +46,6 @@ public String factoryIdentifier() { | |||
@Override | |||
public OptionRule optionRule() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add UT for optionRule()
.
# Conflicts: # seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java # seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
...org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
Show resolved
Hide resolved
String data = this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString()) | ||
.collect(Collectors.joining("\t")) + "\n"; | ||
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), | ||
data.getBytes(StandardCharsets.UTF_8).length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting every row into buffer
inside mmap
region of file channel may cause the executor consumes too much RAM then crushes because of OOM. In my opinion, Clickhouse File Connector is designed for huge data loading scenario where there will be multiples GB of data in every executor.And it is hard to put all these data in memory.
Maybe we can make a tradeoff between speed and memory capacity. We can dump some rows into disk when it accumulate to a certain threshold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, can you tell me more about mmap consume too much RAM, because we map only one row each time. Anything I didn't know?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very sured about the behaviors of file channel.
But when I using fileChannel.map
, spark executors always crash. I currently use the following code as workaround.
ByteBuffer buffer = ByteBuffer.allocate(data.size());
buffer.put(out.toByteArray());
// flip offset
buffer.flip();
fileChannel.write(buffer);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide your crash log? Thanks!
...org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
Outdated
Show resolved
Hide resolved
Co-authored-by: hailin0 <hailin088@gmail.com>
Purpose of this pull request
Check list
New License Guide